C++11 并发学习(三)


学习自大神博客

C++11 ThreadPool 简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <queue>
#include <type_traits>
#include <utility>
#include <vector>
// C++11 版的 线程池
namespace zl
{
class ThreadsGuard
{
public:
ThreadsGuard(std::vector<std::thread>& v)
: threads_(v)
{
}
~ThreadsGuard()
{
for (size_t i = 0; i != threads_.size(); ++i)
{
if (threads_[i].joinable())
{
threads_[i].join();
}
}
}
private:
ThreadsGuard(ThreadsGuard&& tg) = delete;
ThreadsGuard& operator = (ThreadsGuard&& tg) = delete;
ThreadsGuard(const ThreadsGuard&) = delete;
ThreadsGuard& operator = (const ThreadsGuard&) = delete;
private:
std::vector<std::thread>& threads_;
};
class ThreadPool
{
public:
typedef std::function<void()> task_type;
public:
explicit ThreadPool(int n = 0);
~ThreadPool()
{
stop();
cond_.notify_all();
}
void stop()
{
stop_.store(true, std::memory_order_release);
}
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type> add(Function&&, Args&&...);
private:
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator = (ThreadPool&&) = delete;
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator = (const ThreadPool&) = delete;
private:
std::atomic<bool> stop_;
std::mutex mtx_;
std::condition_variable cond_;
std::queue<task_type> tasks_;
std::vector<std::thread> threads_;
zl::ThreadsGuard tg_;
};
inline ThreadPool::ThreadPool(int n)
: stop_(false)
, tg_(threads_)
{
int nthreads = n;
if (nthreads <= 0)
{
nthreads = std::thread::hardware_concurrency();
nthreads = (nthreads == 0 ? 2 : nthreads);
}
for (int i = 0; i != nthreads; ++i)
{
threads_.push_back(std::thread([this]{
while (!stop_.load(std::memory_order_acquire))
{
task_type task;
{
std::unique_lock<std::mutex> ulk(this->mtx_);
this->cond_.wait(ulk, [this]{ return stop_.load(std::memory_order_acquire) || !this->tasks_.empty(); });
if (stop_.load(std::memory_order_acquire))
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
}));
}
}
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
ThreadPool::add(Function&& fcn, Args&&... args)
{
typedef typename std::result_of<Function(Args...)>::type return_type;
typedef std::packaged_task<return_type()> task;
auto t = std::make_shared<task>(std::bind(std::forward<Function>(fcn), std::forward<Args>(args)...));
auto ret = t->get_future();
{
std::lock_guard<std::mutex> lg(mtx_);
if (stop_.load(std::memory_order_acquire))
throw std::runtime_error("thread pool has stopped");
tasks_.emplace([t]{(*t)(); });
}
cond_.notify_one();
return ret;
}
}
#endif /* THREAD_POOL_H */

线程池的具体思路就不多说了,我们主要说一下用到的 c++11 的部分

首先是建立了一个 std::vector< std::thread>& v 的容器

定义了一个原子操作 std::atomic< bool> stop_ 对bool进行了原子性封装

然后不仅用到了 std::lock_guard,还用了相比其提供了更好的上锁和解锁控制的 std::unique_lock。

这里主要介绍一下操作更为丰富的 std::unique_lock

顾名思义,unique_lock 对象以独占所有权的方式(unique owership)管理 mutex 对象的上锁和解锁操作,所谓独占所有权,就是没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权。

①. 上锁/解锁操作:lock,try_lock,try_lock_for,try_lock_until 和 unlock

②. 修改操作:移动赋值(move assignment)(前面已经介绍过了),交换(swap)(与另一个 std::unique_lock 对象交换它们所管理的 Mutex 对象的所有权),释放(release)(返回指向它所管理的 Mutex 对象的指针,并释放所有权)

③. 获取属性操作:owns_lock(返回当前 std::unique_lock 对象是否获得了锁)、operator bool()(与 owns_lock 功能相同,返回当前 std::unique_lock 对象是否获得了锁)、mutex(返回当前 std::unique_lock 对象所管理的 Mutex 对象的指针)。

其他

感觉这个例子还是用到挺多 C++11 的特性,在这里还是得一一解释一下(只是简单解释一下,关于C++还有很多要学的,我目前也只是个菜鸡啊。。。 C++ 太棒了!特性多到你学不完→→手动滑稽):

首先是std::function

简单的来说就是:

通过std::function对 C++ 中各种可调用实体(普通函数、Lambda表达式、函数指针、以及其它函数对象等)的封装,形成一个新的可调用的std::function对象;让我们不再纠结那么多的可调用实体。一切变的简单粗暴。

其实就是以前我们用函数指针来统一处理不同的函数对象类型,现在我们可以使用更安全的std::function来完成这些任务。

在上述例子中,使用它来定义了一个任务类型对象

typedef std::function< void()> task_type;

更为深入地学习,可以看这篇博客

std::memory_order

C++11 中规定了6种访存次序(Memory Order),知乎上给出了通俗的解释

  • momory_order_relaxed,
  • memory_order_consume,
  • memory_order_acquire,
  • memory_order_release,
  • memory_order_acq_rel,
  • memory_order_seq_cst.

虽然有6个选项,其实表示的是三种内存模型

  • sequential consistent(memory_order_seq_cst)

  • relaxed(memory_order_seq_cst)

  • acquire release(memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel)

我们这里用到的是获取-释放次序 acquire release

目的就是限制两个来自不同线程的原子操作的顺序,具体的操作就是需要两个线程进行一下同步,同步对一个变量的读写操作…

①. 原子 load 操作:

while (!stop_.load(std::memory_order_acquire))

②. 原子 store 操作:

std::memory_order_release

std::result_of

其实这个就是和我们之前的auto decltype都是属于自动类型推导,主要是使我们的代码可读易维护

我们这里用的
std::future< typename std::result_of< Function(Args…)>::type> add(Function&&, Args&&…);
就是为了获取function的返回值

result_of 其实就是通过decltype来推导函数的返回类型

std::move

在C++11中,标准库在< utility>中提供了一个有用的函数std::move,这个函数的名字具有迷惑性,因为实际上std::move并不能移动任何东西,它唯一的功能是将一个左值强制转化为右值引用,继而我们可以通过右值引用使用该值,以用于移动语义。从实现上讲,std::move基本等同于一个类型转换:
static_cast< T&&>(lvalue);

值得一提的是,被转化的左值,其生命期并没有随着左右值的转化而改变。如果读者期望std::move转化的左值变量lvalue能立即被析构,那么肯定会失望了。

std::packaged_task

std::packaged_task它包装了一个可调用的目标(如function, lambda expression, bind expression, or another function object),以便异步调用,它和promise在某种程度上有点像,promise保存了一个共享状态的值,而packaged_task保存的是一个函数

typedef std::packaged_task< return_type()> task;

std::make_shared

C++11 中引入了智能指针, 同时还有一个模板函数 std::make_shared 可以返回一个指定类型的 std::shared_ptr:

struct A;
std::shared_ptr< A> p1 = std::make_shared< A>();
std::shared_ptr< A> p2(new A);

①. 如果是使用 new

auto p = new widget();
shared_ptr sp1{ p }, sp2{ sp1 };
需要两次内存分配

②. 然而使用 make_shared 的话:

auto sp1 = make_shared(), sp2{ sp1 };

所以我们这里 std::make_shared 其实就是为了代替 new
auto t = std::make_shared< task>(std::bind(std::forward< Function>(fcn), std::forward< Args>(args)…));

想要进一步了解它与std::shared_ptr的构造函数比较的戳

总结一些我们需要了解的就是:

①. 同直接使用new相比,make函数减小了代码重复,提高的异常安全,并且对于std::make_shared和std::allcoated_shared,生成的代码会更小更快。

②. 不能使用make函数的情况包括我们需要定制删除器和期望直接传递大括号初始化器。

③. 对于std::shared_ptr,额外的不建议使用make函数的情况包括:
(1)定制内存管理的类
(2)关注内存的系统,非常大的对象,以及生存期比 std::shared_ptr长的std::weak_ptr

下一篇我们将会实现C++11多线程下各种生产者消费者模型